Homework 6 , Spring 2022
Name: Anitha Ganapathy
Email: aganapa@iu.edu
Organizing the imports
# !pip install ipympl
# Import the necessary libraries
from mpl_toolkits.mplot3d import Axes3D
from google.colab import output
output.enable_custom_widget_manager()
from PIL import Image
# from scipy.misc import imread
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# import matplotlib.mlab as mlab
import librosa
import soundfile as sf
from IPython.display import Audio
import cmath, math
import scipy.io # to read .mat files
import seaborn as sns
from numpy.core.fromnumeric import size
import scipy
from scipy.spatial.distance import pdist, squareform
from scipy import exp
from numpy.linalg import eigh
%matplotlib inline
from google.colab import drive
drive.mount('/content/drive')
dirpath = '/content/drive/MyDrive/MLSP_data/'
!ls $dirpath
X_L = plt.imread(dirpath + "im0.ppm")
X_R = plt.imread(dirpath + "im8.ppm")
print(f"Left Image shape : {X_L.shape}, Right Image shape : {X_R.shape}")
fig = plt.figure(figsize=(18, 8))
fig.add_subplot(1, 2, 1)
plt.imshow(X_L)
plt.axis('off')
plt.title("Left", fontsize = 20)
fig.add_subplot(1, 2, 2)
plt.imshow(X_R)
plt.axis('off')
plt.title("Right", fontsize = 20)
print(f"Left Image shape : {X_L.shape}, Right Image shape : {X_R.shape}")
def similar_of_40_pixels(i, j):
similar_idx = 0
similarity_threshold = np.inf
for k in range(40):
distance = np.sqrt(np.sum(np.abs(X_R[i,j, :] - X_L[i,j + k, :])))
if distance < similarity_threshold:
similarity_threshold =distance
similar_idx = k
return similar_idx
D = np.zeros((X_L.shape[0], X_L.shape[1] - 40), dtype=int)
print(D.shape)
# Calculating the Disparity Matrix
for i in range(D.shape[0]):
for j in range(D.shape[1]):
D[i, j ] = similar_of_40_pixels(i, j)
print(D)
print(D.shape)
D[:,:]
D_vec = D.flatten()
print(f'D_vec shape is {D_vec.shape}')
fig = plt.figure(figsize=(18,5))
fig.add_subplot(1, 2, 1)
plt.hist(D_vec, bins=40)
plt.title("Histogram of vectorized Disparity matrix, bin = 40", fontsize = 15)
fig.add_subplot(1, 2, 2)
plt.hist(D_vec, bins=50)
plt.title("Histogram of vectorized Disparity matrix, bin = 50", fontsize = 15)
plt.show()
With bin size = 50 we can see 11 clusters are being formed
def pdf_dist_normal(mu, variance, x):
return (1/np.sqrt(2*np.pi*(variance))) * (np.exp(-(((x-mu)**2)/(2*variance))))
df = pd.DataFrame({'Disparity': D_vec})
df.head()
from scipy.stats import multivariate_normal
class GMM:
def __init__(self, k, max_iter=5):
self.k = k
self.max_iter = int(max_iter)
def initialize(self, X):
self.shape = X.shape
self.n, self.m = self.shape
self.phi = np.full(shape=self.k, fill_value=1/self.k)
self.weights = np.full( shape=self.shape, fill_value=1/self.k)
random_row = np.random.randint(low=0, high=self.n, size=self.k)
self.mu = [ X[row_index,:] for row_index in random_row ]
self.sigma = [ np.cov(X.T) for _ in range(self.k) ]
def e_step(self, X):
# E-Step: update weights and phi holding mu and sigma constant
self.weights = self.predict_proba(X)
self.phi = self.weights.mean(axis=0)
def m_step(self, X):
# M-Step: update mu and sigma holding phi and weights constant
for i in range(self.k):
weight = self.weights[:, [i]]
total_weight = weight.sum()
self.mu[i] = (X * weight).sum(axis=0) / total_weight
self.sigma[i] = np.cov(X.T,
aweights=(weight/total_weight).flatten(),
bias=True)
def fit(self, X):
self.initialize(X)
for iteration in range(self.max_iter):
self.e_step(X)
self.m_step(X)
def predict_proba(self, X):
likelihood = np.zeros( (self.n, self.k) )
for i in range(self.k):
distribution = multivariate_normal(
mean=self.mu[i],
cov=self.sigma[i])
likelihood[:,i] = distribution.pdf(X)
numerator = likelihood * self.phi
denominator = numerator.sum(axis=1)[:, np.newaxis]
weights = numerator / denominator
return weights
def predict(self, X):
weights = self.predict_proba(X)
return np.argmax(weights, axis=1)
from scipy.stats import mode
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
# X =D_vec.reshape(-1,1)
np.random.seed(42)
gmm = GMM(k=10, max_iter=100)
gmm.fit(D_vec.reshape(-1,1))
clus = gmm.predict(D_vec)
df['clus'] = clus
df
df.clus.unique()
fig = plt.figure(figsize=(8,5))
plt.scatter(df.Disparity, df.clus)
plt.show()
sns.set(rc={'figure.figsize':(18,5)})
ax = sns.barplot(x=df['Disparity'], y = df.index, hue=df['clus'],
palette='husl', dodge=False)
#place legend outside top right corner of plot
plt.legend(bbox_to_anchor=(1.02, 1), loc='upper left', borderaxespad=0)
plt.title("GMM Clustering", fontsize = 25)
plt.show()
centriods = gmm.mu
sigmas = gmm.sigma
print(centriods)
print(sigmas)
df['depth_map'] = df['clus']
df.head()
depth_map = np.array(df['clus'], dtype=float)
for i in range(len(centriods)):
depth_map[depth_map == i] = centriods[i]
df['depth_map'] = depth_map
df.head()
# Plotting the clustered images with the Disparuty data and Depth Map
plt.figure(figsize=(8,5))
plt.imshow(np.array(df['Disparity']).reshape((381, 390)),cmap="gray", aspect = 'auto')
plt.title("Disparity data", fontsize = 25)
plt.grid(False)
plt.show()
plt.figure(figsize=(8,5))
plt.imshow(np.array(df['depth_map']).reshape((381, 390)),cmap="gray", aspect = 'auto')
plt.title("Depth Map", fontsize = 25)
plt.grid(False)
plt.show()
# plt.figure(figsize=(8,5))
# plt.imshow(np.array(df['clus']).reshape((381, 390)),cmap="gray", aspect = 'auto')
# plt.title("Cluster values", fontsize = 25)
# plt.grid(False)
# plt.show()
Implementation with the MRF’s smoothing priors using an eight neighborhood system
clus_list = []
smooth_centriod_map = np.array(df.clus).reshape(D.shape)
prev_cluster_map = np.array(df.clus).reshape(D.shape)
smoothened_depth_map = np.zeros(D.shape)
def similarity_pixel_finder(i, j, cluster):
if i < 0 or \
j < 0 or \
i > prev_cluster_map.shape[0]-1 or \
j > prev_cluster_map.shape[1]-1 or \
prev_cluster_map[i,j] == cluster:
return 1
a = 5
sigma = 0.5
if prev_cluster_map[i,j] == cluster:
a = 0
return np.exp(-(a*a/(2*sigma*sigma)))
def prior_calculator(i,j,cluster):
N = [-1,0,1]
prior_p = 1
for k in N:
for l in N:
prior_p *= similarity_pixel_finder(i+k,j+l, cluster)
return prior_p
cluster_map = np.array(df.clus)
cluster_map = cluster_map.reshape(D.shape)
depth_map = depth_map.reshape(D.shape)
def pdf_dist_normal(mu, sigma, x):
pdf = (1/np.sqrt((2*np.pi*(sigma**2)))) * (np.exp(-(((x-mu)**2)/(2*(sigma**2)))))
return pdf
unique_clus = len(df.clus.unique())
np.sort(df.clus.unique())
%%time
for iter in range(30):
for i in range(D.shape[0]):
for j in range(D.shape[1]):
current_cluster = cluster_map[i,j]
posterior = np.zeros(unique_clus)
for k in range(unique_clus):
posterior[k] = pdf_dist_normal(centriods[k], sigmas[k], depth_map[i,j]) * prior_calculator(i,j,k)
posterior = posterior/np.sum(posterior)
new_label = np.random.choice(np.arange(0, unique_clus), p=posterior)
smoothened_depth_map[i,j] = centriods[new_label]
smooth_centriod_map[i,j] = new_label
clus_list.append(smoothened_depth_map)
prev_cluster_map = np.array(smooth_centriod_map)
c_array = np.array(clus_list)
smoothened_final_map = scipy.stats.mode(c_array[-10:])[0][0]
plt.figure(figsize=(8,15))
plt.title('Depth map after smoothing', fontsize = 25)
plt.imshow(smoothened_final_map,cmap="gray", alpha=0.8)
plt.grid(False)
plt.show()
!ls $dirpath
trs, sr1 = librosa.load(dirpath + 'trs.wav' , sr = None)
trn, sr1 = librosa.load(dirpath + 'trn.wav' , sr = None)
tex, sr1 = librosa.load(dirpath + 'tex.wav' , sr = None)
tes, sr1 = librosa.load(dirpath + 'tes.wav' , sr = None)
Original S and N i.e sound and noise signals
Audio(trs, rate = sr1)
Audio(trn, rate = sr1)
Audio(tex, rate = sr1)
Audio(tes, rate = sr1)
S = librosa.stft(trs, n_fft=1024, hop_length = 512, window='hann')
N = librosa.stft(trn, n_fft=1024, hop_length = 512, window='hann')
S.shape , N.shape
X = librosa.stft(tex, n_fft=1024, hop_length = 513, window='hann')
X_test = librosa.stft(tes, n_fft=1024, hop_length = 513, window='hann')
X.shape, X_test.shape
Spectrogram X
# Plot the signal read from wav file
plt.figure(figsize = (15,10))
plt.subplot(211)
plt.suptitle("tex wav, signal in time domain ", y=1.05, fontsize=25)
plt.title('Spectrogram of a tex wav (S+N)', fontsize=15)
plt.plot(tex)
plt.xlabel('Time')
plt.ylabel('Amplitude')
plt.grid(False)
plt.show()
plt.subplot(212)
plt.specgram(tex ,Fs = sr1)
plt.title("tex wav, signal in frequency domain using the spectrogram ", y=1.05, fontsize=18)
plt.xlabel('Time')
plt.ylabel('Frequency')
plt.grid(False)
plt.show()
plt.specgram(trs ,Fs = sr1)
plt.title("Sound ", y=1.05, fontsize=18)
plt.grid(False)
plt.show()
plt.specgram(trn ,Fs = sr1)
plt.title("Noise ", y=1.05, fontsize=18)
plt.grid(False)
plt.show()
plt.specgram(tex ,Fs = sr1)
plt.title("Test Signal (S+N) ", y=1.05, fontsize=18)
plt.grid(False)
plt.show()
# np.set_printoptions(suppress=True)
# df = pd.DataFrame({'S': np.abs(S[:,0]),
# 'N': np.abs(N[:,0]),
# 'X': np.abs(X[:,0])})
# df
print("S shape: ",S.shape)
print("N shape: ",N.shape)
print("X shape: ",X.shape)
print("X_test shape: ",X_test.shape)
%%time
# initialize parameters
k = 30
epochs = 4000
epsilon = 1e-8
B_1 = np.random.randn(513, k)
theta_1 = np.random.randn(k, 893)
print(f'B_1 shape : {B_1.shape}, Theta_1 shape: {theta_1.shape}')
S_mag = np.abs(S)
for i in range(epochs):
# weights update
den = np.dot(B_1,theta_1)
den[den == 0] = epsilon
B_1 = np.multiply(B_1, np.dot((S_mag / den), theta_1.T))
den = np.dot(B_1,theta_1)
den[den == 0] = epsilon
theta_1 = theta_1 * np.dot(B_1.T, (S_mag / den))
# normalization
B_1[B_1 == 0] = epsilon
B_1 = B_1 / np.dot(np.ones([B_1.shape[0], B_1.shape[0]]), B_1)
theta_1 = theta_1 / np.dot(np.ones([theta_1.shape[0], theta_1.shape[0]]), theta_1)
%%time
# initialize parameters for ta
B_2 = np.random.randn(513, k)
theta_2 = np.random.randn(k, 893)
print(f'B_2 shape : {B_2.shape}, Theta_2 shape: {theta_2.shape}')
N_mag = np.abs(N)
for i in range(epochs):
# weights update
den = np.dot(B_2,theta_2)
den[den == 0] = epsilon
B_2 = np.multiply(B_2, np.dot((N_mag / den), theta_2.T))
den = np.dot(B_2, theta_2)
den[den == 0] = epsilon
theta_2 = np.multiply(theta_2, np.dot(B_2.T, (N_mag / den)))
#normalization
B_2[B_2 == 0] = epsilon
B_2 = B_2 / np.dot(np.ones([B_2.shape[0], B_2.shape[0]]), B_2)
theta_2 = theta_2 / np.dot(np.ones([theta_2.shape[0], theta_2.shape[0]]), theta_2)
B_1.shape, B_2.shape , theta_2.shape
print(X.shape)
%%time
B_mat = np.hstack([B_1, B_2])
theta_3 = np.random.randn(B_mat.shape[1], X.shape[1])
print(f'B_mat shape : {B_mat.shape}, Theta_3 shape: {theta_3.shape}')
print("B_mat, theta : ",B_mat.shape, theta_3.shape)
print("B_mat, X :", B_mat.shape, X.shape )
X_mag = np.abs(X)
for i in range(epochs):
# theta_3 update
den = np.dot(B_mat, theta_3)
den[den == 0] = epsilon
theta_3 = np.multiply(theta_3, np.dot(B_mat.T,(X_mag / den)))
theta_3 = theta_3 / np.dot(np.ones([theta_3.shape[0], theta_3.shape[0]]), theta_3)
X_mag.shape, B_mat.shape , theta_3.shape
B_1.shape , theta_3[:k, :].shape
B_mat.shape, theta_3.shape
S_test_hat calculation
num_val = np.dot(B_1 , theta_3[:k, :])
num_val.shape
denom_val = np.dot(B_mat, theta_3)
denom_val.shape
X_test_hat = np.multiply(X , num_val / denom_val)
X_test_hat.shape
Recovered Test signal
X_rec = librosa.istft(X_test_hat, hop_length = 512)
Audio(X_rec , rate = sr1)
# S_recov = S_recov.reshape(513, -1)
# S_recov.shape, X_test.shape
def snr_calculate(S, S_recov):
return 10 * (np.log10(np.sum(S ** 2)/ (np.sum((S-S_recov) **2 ))))
X_test_mag = np.abs(X_test)
X_test_hat_mag = np.abs(X_test_hat)
print(f'SNR value of the separation result by comparing is: {np.abs(snr_calculate(X_test_mag, X_test_hat_mag))}')
X_test_hat_mag.shape
# plt.specgram(X_test_hat_mag.flatten() ,Fs = sr1)
# plt.title("Test Signal (S+N) ", y=1.05, fontsize=18)
# plt.grid(False)
# plt.show()
twitter = scipy.io.loadmat(dirpath + 'twitter.mat')
Xtr = twitter['Xtr']
Xte = twitter['Xte']
YtrMat = twitter['YtrMat']
YteMat = twitter['YteMat']
Xtr.shape , Xte.shape
YtrMat.shape, YteMat.shape
topics = np.random.randn(891, 50)
weights = np.random.randn(50, 773)
for i in range(2000):
den = np.dot(topics,weights)
den[den == 0] = 0.0001
topics = topics * np.dot((Xtr/den), weights.T)
topics = topics / np.dot(np.ones([topics.shape[0], topics.shape[0]]), topics)
den = np.dot(topics,weights)
den[den == 0] = 0.0001
weights = weights * np.dot(topics.T, (Xtr/den))
weights = weights / np.dot(np.ones([weights.shape[0], weights.shape[0]]), weights)
weights2 = np.random.rand(50,193)
for i in range(2000):
den = np.dot(topics, weights2)
den[den == 0] = 0.0001
weights2 = weights2 * np.dot(topics.T, (Xte/den))
weights2 = weights2 / np.dot(np.ones([weights2.shape[0], weights2.shape[0]]), weights2)
Perceptron Neural Network definition
alpha = 0.0005
W = np.random.uniform(0, 6, (3, weights.shape[0]))
b = np.random.uniform(0, 6, (3,1))
error_plot = []
for i in range(5000):
Y_hat = np.exp(np.dot(W, weights) + b)
Y_hat = Y_hat / np.sum(Y_hat, axis=0).reshape(1,-1)
error = -np.sum(YtrMat * np.log(Y_hat))
delta_w = np.dot((Y_hat - YtrMat), weights.T)
delta_b = np.dot((Y_hat - YtrMat), np.ones([YtrMat.shape[1],1]))
W = W - (alpha * delta_w)
b = b - (alpha * delta_b)
error_plot.append(error)
Z = np.dot(W, weights2) + b
Y_hat_test = np.exp(Z)
Y_hat_test = Y_hat_test / np.sum(Y_hat_test, axis=0).reshape(1,-1)
train_accuracy = np.sum(np.argmax(Y_hat, axis=0) == np.argmax(YtrMat, axis=0)) / YtrMat.shape[1]
test_accuracy = np.sum(np.argmax(Y_hat_test, axis=0) == np.argmax(YteMat, axis=0)) / YteMat.shape[1]
print('Training accuacy is: ' ,np.round(train_accuracy, 4) * 100)
print('Testing accuacy is: ' , np.round(test_accuracy, 4) * 100)
plt.figure(figsize=(8, 5))
plt.title('Convergence plot', fontsize = 20)
plt.plot(error_plot)
plt.show()
# %%shell
# jupyter nbconvert --to html /content/AG_MLSP_SP22_HW_6.ipynb